/**
* Copyright © 2018-2019, by 晓叹星沉.
*/
package org.aurora.mq.core;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

/**
 * 
 * <p>
 * rocketMQ消息发送类
 * </p>
 * @author 晓叹星沉
 * @since jdk1.8
 * 2019年7月10日
 *  
 */

public class DefaultRocketMQProducer {
	
	private static MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();

	private DefaultMQProducer producer;

	
	/**  
	 * DefaultRocketMQProducer
	 * @param producer    
	 */
	public DefaultRocketMQProducer(DefaultMQProducer producer) {
		super();
		this.producer = producer;
	}
	
	/**
	 * 
	 * 发送消息
	 * @param topic 主题
	 * @param tags tag
	 * @param key 全局唯一标识
	 * @param body 内容
	 * @return SendResult 发送结果
	 * @throws MQClientException
	 * @throws RemotingException
	 * @throws MQBrokerException
	 * @throws InterruptedException
	 */
	public SendResult send(String topic, String tags, String key, byte[] body) throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
		Message msg = initMsg(topic, tags, key, body);
		return producer.send(msg);
	}
	
	/**
	 * 
	 * 顺序发送消息
	 * @param topic 主题
	 * @param tags tag
	 * @param key 全局唯一标识
	 * @param body 内容
	 * @param hashKey 相同的hashKey将被发送到同一个queue中 
	 * @return SendResult 发送结果
	 * @throws MQClientException
	 * @throws RemotingException
	 * @throws MQBrokerException
	 * @throws InterruptedException
	 */
	public SendResult sendOrderly(String topic, String tags, String key, byte[] body, String hashKey) throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
		Message msg = initMsg(topic, tags, key, body);
		return producer.send(msg, messageQueueSelector, hashKey);
	}

	/**
	 * 
	 * 异步发送消息
	 * @param topic 主题
	 * @param tags tag
	 * @param key 全局唯一标识
	 * @param body 内容
	 * @param sendCallback 回调
	 * @throws MQClientException
	 * @throws RemotingException
	 * @throws MQBrokerException
	 * @throws InterruptedException
	 */
	public void asyncSend(String topic, String tags, String key, byte[] body, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
		Message msg = initMsg(topic, tags, key, body);
		producer.send(msg, sendCallback);
	}

	
	/**
	 * 初始化消息
	 * @param topic
	 * @param tags
	 * @param key
	 * @param body
	 * @return
	 */
	private Message initMsg(String topic, String tags, String key, byte[] body) {
		Message msg = new Message(topic, tags, key, body);
		return msg;
	}
	
	public void shutdown(){
		producer.shutdown();
	}
	
}
